package com.atguigu.flink.wordcount;

import com.atguigu.flink.function.MyWordCountFlatmapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Created by Smexy on 2023/11/7
 *
    无界流： 有开始，没结束。数据源源不断地来。

    通过一个网络端口不断地发送数据，到flink程序。

 */
public class Demo3_UnBoundedStreamDemo
{
    public static void main(String[] args) throws Exception {
        //1.创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取hadoop102:8888端口发送的数据，封装为数据模型
        DataStreamSource<String> dataStreamSource = environment.socketTextStream("hadoop102", 8888);

        dataStreamSource.
            flatMap(new MyWordCountFlatmapFunction())
            //由于数据是tuple所以，需要使用groupBy(int... fields)
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>()
            {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            })
            //对Tuple2的第二列进行累积
            .sum(1)
            .print();

        //提交job，触发运行
        environment.execute();



    }
}
